Library Imports
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
Template
spark = (
    SparkSession.builder
    .master("local")
    .appName("Exploring Joins")
    .config("spark.some.config.option", "some-value")
    .getOrCreate()
)
sc = spark.sparkContext
Initial Datasets
pets = spark.createDataFrame(
    [
        (1, 1, 'Bear'),
        (2, 1, 'Chewie'),
        (3, 2, 'Roger'),
    ], ['id', 'breed_id', 'nickname']
)
pets.toPandas()
| id | breed_id | nickname | |
|---|---|---|---|
| 0 | 1 | 1 | Bear | 
| 1 | 2 | 1 | Chewie | 
| 2 | 3 | 2 | Roger | 
breeds = spark.createDataFrame(
    [
        (1, 'Pitbull', 10), 
        (2, 'Corgie', 20), 
    ], ['breed_id', 'name', 'average_height']
)
breeds.toPandas()
| breed_id | name | average_height | |
|---|---|---|---|
| 0 | 1 | Pitbull | 10 | 
| 1 | 2 | Corgie | 20 | 
Filter Pushdown
Filter pushdownimproves performance by reducing the amount of data shuffled during any dataframes transformations.
Depending on your filter logic and where you place your filter code. Your Spark code will behave differently.
Case #1: Filtering on Only One Side of the Join
df = (
    pets
    .join(breeds, 'breed_id', 'left_outer')
    .filter(F.col('nickname') == 'Chewie')
)
df.toPandas()
| breed_id | id | nickname | name | average_height | |
|---|---|---|---|---|---|
| 0 | 1 | 2 | Chewie | Pitbull | 10 | 
df.explain()
== Physical Plan ==
*(4) Project [breed_id#1L, id#0L, nickname#2, name#7, average_height#8L]
+- SortMergeJoin [breed_id#1L], [breed_id#6L], LeftOuter
   :- *(2) Sort [breed_id#1L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(breed_id#1L, 200)
   :     +- *(1) Filter (isnotnull(nickname#2) && (nickname#2 = Chewie))
   :        +- Scan ExistingRDD[id#0L,breed_id#1L,nickname#2]
   +- *(3) Sort [breed_id#6L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(breed_id#6L, 200)
         +- Scan ExistingRDD[breed_id#6L,name#7,average_height#8L]
What Happened:
Because the column nickname is only present in the left side of the join, only the left side of the join was filtered before the join.
Case #2: Filter on Both Sides of the Join
df = (
    pets
    .join(breeds, 'breed_id', 'left_outer')
    .filter(F.col('breed_id') == 1)
)
df.toPandas()
| breed_id | id | nickname | name | average_height | |
|---|---|---|---|---|---|
| 0 | 1 | 1 | Bear | Pitbull | 10 | 
| 1 | 1 | 2 | Chewie | Pitbull | 10 | 
df.explain()
== Physical Plan ==
*(4) Project [breed_id#1L, id#0L, nickname#2, name#7, average_height#8L]
+- SortMergeJoin [breed_id#1L], [breed_id#6L], LeftOuter
   :- *(2) Sort [breed_id#1L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(breed_id#1L, 200)
   :     +- *(1) Filter (isnotnull(breed_id#1L) && (breed_id#1L = 1))
   :        +- Scan ExistingRDD[id#0L,breed_id#1L,nickname#2]
   +- *(3) Sort [breed_id#6L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(breed_id#6L, 200)
         +- Scan ExistingRDD[breed_id#6L,name#7,average_height#8L]
What Happened:
The column breed_id is present in both sides of the join, but only the left side was filtered before the join.
Case #3: Filter on Both Sides of the Join #2
df = (
    pets
    .join(breeds, 'breed_id')
    .filter(F.col('breed_id') == 1)
)
df.toPandas()
| breed_id | id | nickname | name | average_height | |
|---|---|---|---|---|---|
| 0 | 1 | 1 | Bear | Pitbull | 10 | 
| 1 | 1 | 2 | Chewie | Pitbull | 10 | 
df.explain()
== Physical Plan ==
*(5) Project [breed_id#1L, id#0L, nickname#2, name#7, average_height#8L]
+- *(5) SortMergeJoin [breed_id#1L], [breed_id#6L], Inner
   :- *(2) Sort [breed_id#1L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(breed_id#1L, 200)
   :     +- *(1) Filter (isnotnull(breed_id#1L) && (breed_id#1L = 1))
   :        +- Scan ExistingRDD[id#0L,breed_id#1L,nickname#2]
   +- *(4) Sort [breed_id#6L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(breed_id#6L, 200)
         +- *(3) Filter (isnotnull(breed_id#6L) && (breed_id#6L = 1))
            +- Scan ExistingRDD[breed_id#6L,name#7,average_height#8L]
What Happened:
The column breed_id is present in both sides of the join, and spark was able to figure out that it should perform a filter on both sides before the join.
Case #4: Filter on Both Sides of the Join, Filter Beforehand
df = (
    pets
    .join(
        breeds.filter(F.col('breed_id') == 1), 
        'breed_id', 
        'left_outer'
    )
    .filter(F.col('breed_id') == 1)
)
df.toPandas()
| breed_id | id | nickname | name | average_height | |
|---|---|---|---|---|---|
| 0 | 1 | 1 | Bear | Pitbull | 10 | 
| 1 | 1 | 2 | Chewie | Pitbull | 10 | 
df.explain()
== Physical Plan ==
*(5) Project [breed_id#1L, id#0L, nickname#2, name#7, average_height#8L]
+- SortMergeJoin [breed_id#1L], [breed_id#6L], LeftOuter
   :- *(2) Sort [breed_id#1L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(breed_id#1L, 200)
   :     +- *(1) Filter (isnotnull(breed_id#1L) && (breed_id#1L = 1))
   :        +- Scan ExistingRDD[id#0L,breed_id#1L,nickname#2]
   +- *(4) Sort [breed_id#6L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(breed_id#6L, 200)
         +- *(3) Filter (isnotnull(breed_id#6L) && (breed_id#6L = 1))
            +- Scan ExistingRDD[breed_id#6L,name#7,average_height#8L]
What Happened:
The column breed_id is present in both sides of the join, and both sides were filtered before the join.
Summary
- To improve join performance, we should always try to push the filterbefore the joins.
- Spark might be smart enough to figure that the filtercan be performed on both sides, but not always.
- You should alway check to see if your Spark DAG is performant during a join and if any filters can be pushed before the joins.